Skip to content

feat(orchestrator): refactor to overlap train/eval #2639

Open
mikasenghaas wants to merge 121 commits into
mainfrom
exp/orchestrator-v2
Open

feat(orchestrator): refactor to overlap train/eval #2639
mikasenghaas wants to merge 121 commits into
mainfrom
exp/orchestrator-v2

Conversation

@mikasenghaas
Copy link
Copy Markdown
Member

@mikasenghaas mikasenghaas commented May 26, 2026

Why

Three concrete wins over the previous prime_rl.orchestrator:

  1. Train + eval rollouts overlap. No more long tails on online evals. When the orchestrator ships a train batch that lands on an eval interval, it tells the dispatcher to flip to PREFER_EVALthe dispatcher stops scheduling new train rollouts but lets the in-flight train tail drain naturally while eval queues up. Conversely, when the eval queue empties the dispatcher starts scheduling train again while the eval tail finishes. Both transitions overlap on the same shared inflight counter — capacity stays saturated.

  2. Shared concurrency limiter across train + eval. A single inflight_permits counter (capacity = max_inflight_rollouts) and a single AsyncLimiter(tasks_per_minute) govern both kinds of rollouts. The pipeline always runs at the rate the inference infra can sustain — no separate budgets to tune and no idle inference capacity while one kind drains.

  3. Natural rollout / group / batch hierarchy amortizes the heavy per-batch post-processing. The old design did tokenization for an entire batch's worth of rollouts at once (asyncio.gather over interleave_rollout for all 128 rollouts) at ship time — a synchronization point that stalled the pipeline. The new design splits work by its natural scope:

    • rollout level — interleave_rollout / backfill_rollout_tokens happen as each rollout completes, overlapped with the dispatcher producing more rollouts.
    • group level — compute_advantages runs over one GRPO group at a time.
    • batch level — post_batch_filters + metrics build runs once per ship, on already-tokenized samples.

Summary

  • Replace the step-driven outer loop in prime_rl.orchestrator with an async-pipelined design driven by signal-based sinks. Clean deprecation — no flag, no parallel module, no compatibility shim.
  • The atomic unit is a single Rollout. Sinks own grouping; three uniform processing levels (process_rollout / process_group / process_batch) drive both train and eval flows.
  • The dispatcher schedules both train and eval rollouts through a single shared inflight_permits counter + shared AsyncLimiter(tasks_per_minute). Drain-switch overlap on each eval boundary recovers the cost of the previous "blocking eval" without cancelling in-flight work.
  • Every dispatched rollout reaches the sink exactly once. Success, env-error, empty-trajectory, task-exception, off-policy-cancel — all flow through out_q as Rollouts; the dispatcher synthesizes minimal vf.RolloutOutput markers for the failure paths. The sink owns drop / partial-train policy (filters errored rollouts; drops the whole group when the env requires_group_scoring and any failed).
  • Group / batch finalization is sink-derived by counting. The "last rollout in a group" is whichever straggler finishes last — not knowable at dispatch time — so no flags travel on the Rollout; each rollout carries a group_id (UUID, generated at dispatch) and the sink finalizes that group on the group_size-th arrival.
  • Example sourcing is its own abstraction. TrainSource (infinite weighted-round-robin pull across train envs) lives in train_source.py; EvalSource (trigger-driven finite-per-epoch pull with round-robin interleaving across fired envs) lives in eval_source.py. Both expose a symmetric next_example(available_permits) -> dict | None. The dispatcher just asks "what's the next example?" — it doesn't own dataset cursors, shuffling, or eval triggering.
  • Eval triggering happens in the orchestrator, per ship step. After each train batch ships, Orchestrator.finalize_train_batch calls eval_source.trigger(step) (which collapses startup, resume, and interval logic into one first_trigger flag) and flips the dispatcher to PREFER_EVAL if anything fires. The watcher's on_new_version is single-purpose: bump off-policy counters and cancel stale rollouts.
  • Pipeline drains at max_steps — no out-of-band final eval pass. When the final train batch ships, the orchestrator cancels all in-flight train rollouts, flips to drain mode, the dispatcher stops scheduling new train, and any interval-aligned eval at the final step completes through the normal pipeline before the main loop exits.
  • One unified PeriodicLogger on the orchestrator drives a single pipeline line: train batch fill (finalized-group survivors, 0→batch_size) plus a separate (+N buffered) count for rollouts sitting in not-yet-complete groups, per-active-eval-epoch progress, and total inflight split as (train=, eval= | per-env). Dispatcher / watcher gauges and event-loop lag go to wandb on the _timestamp axis (wall-clock, not step). No central log-loop task that pulls from everyone; no per-component logger to wire up.
  • Adds pre_batch_filters / post_batch_filters config fields. A @model_validator(mode="before") renames filterspost_batch_filters for TOML/CLI input (no warning); existing TOMLs keep parsing. Filters are train-only — the eval sink has no filter code by design.

Supersedes the prior brainstorming attempts in #2254 and #2336. Refs #2311.

Architecture

                                ┌──────────────────────────────────┐
                                │     WeightWatcher (task)         │
                                │ polls broadcasts/, on_new_version│
                                └────────────┬─────────────────────┘
                                             │ Policy.version, on_new_version (off-policy cancel)
                                             ▼
   ┌─────────────────┐         ┌──────────────────────────────────┐
   │  TrainSource    │ ──────► │       RolloutDispatcher          │  shared:
   │  (infinite RR)  │         │           (task)                 │   • inflight_permits counter
   └─────────────────┘         │  PREFER_TRAIN ⇄ PREFER_EVAL      │   • AsyncLimiter(tasks_per_minute)
                               │  off-policy cancel               │   • Policy (read at dispatch)
   ┌─────────────────┐         │  drain mode: stop scheduling     │
   │  EvalSource     │ ──────► │     new train (post-max_steps)   │
   │  (trigger-driven│         │  emits every dispatched rollout  │
   │   epoch queue)  │         │  (success or error marker)       │
   └────────▲────────┘         └────────────┬─────────────────────┘
            │ trigger(step)                 │ Rollout (raw.error optional)
            │                               ▼
            │                          asyncio.Queue (out_q)
            │                               │
            │           ┌───────────────────┴────────────────────┐
            │           ▼                                        ▼
            │ ┌────────────────────────┐         ┌───────────────────────────┐
            │ │      TrainSink         │         │       EvalSink            │
            │ │ process_rollout: tok-  │         │ process_rollout: no-op    │
            │ │   enize (interleave)   │         │                           │
            │ │ process_group: filter  │         │ process_group: bucket     │
            │ │   errors, advantages,  │         │   per-example into env    │
            │ │   pre_filters          │         │ process_batch: build per- │
            │ │ process_batch: post_   │         │   env reward/pass@k/      │
            │ │   filters + samples    │         │   turns/truncation        │
            │ │ finalizes by sink-     │         │   (EvalBatchMetrics)      │
            │ │   derived counts       │         │ finalizes by sink-derived │
            │ │                        │         │   counts                  │
            │ └───────────┬────────────┘         └───────────┬───────────────┘
            │             │ TrainBatch                       │ EvalBatch
            │             ▼                                  ▼
            │ ┌────────────────────────────────────────────────────────────────┐
            └─┤ Orchestrator main_loop / finalize_train_batch / finalize_eval_batch │
              │ ckpt save, ship to trainer, monitor.log, heartbeat, step++,        │  + PeriodicLogger
              │ fires eval triggers, drives drain on max_steps                     │    (unified "Pipeline" view:
              └────────────────────────────────────────────────────────────────┘     │     dispatcher + watcher +
                                                                                      │     sinks + event-loop lag)

Three background tasks (dispatcher.start(), watcher.start(), the orchestrator's PeriodicLogger) + the in-task main_loop under Orchestrator.start(). Event-loop lag comes from verifiers.utils.async_utils.EventLoopLagMonitor and feeds the pipeline view.

Verification runs

Run these one-by-one to exercise the new orchestrator end-to-end.

1. Basic training modes (rl / sft / opd) — student inference auto-launches on GPU 0; opd/sft need a teacher on GPU 1 first (rl needs no teacher):

# Teacher — only for opd + sft (skip for rl)
CUDA_VISIBLE_DEVICES=1 uv run inference \
  --model.name PrimeIntellect/Qwen3-0.6B-Reverse-Text-RL \
  --server.port 8001 --gpu-memory-utilization 0.5 --model.enforce-eager

uv run rl @ configs/debug/training_modes/rl.toml
uv run rl @ configs/debug/training_modes/opd.toml
uv run rl @ configs/debug/training_modes/sft.toml

2. Single-turn with resume (reverse-text; use the same --output-dir so resume_step=15 picks up the checkpoint):

uv run rl @ configs/ci/integration/reverse_text/start.toml  --output-dir outputs/reverse-text-resume
uv run rl @ configs/ci/integration/reverse_text/resume.toml --output-dir outputs/reverse-text-resume

3. Multi-env + eval (2 train envs, 2 eval envs with staggered eval intervals):

uv run rl @ configs/debug/multi_env/reverse_text.toml

4. Long-running basic run (Hendrycks sanity, 8 GPUs: 4 train / 4 infer):

uv run rl @ examples/hendrycks_sanity/rl.toml --wandb.project <project> --wandb.name <run>

5. Long-running evals (RLM-SWE Qwen3.5-4B-Thinking, multi-node SLURM: 1 train × 2 infer node):

uv run rl @ configs/rlm_swe/qwen35_4b.toml

Curves

examples/alphabet_sort/rl.toml (W&B)

Screenshot 2026-05-29 at 10 31 53 AM

examples/hendrycks_sanity/rl.toml (W&B)

Screenshot 2026-05-29 at 10 32 52 AM

configs/rlm_swe/qwen35_4b.toml (W&B)

Screenshot 2026-05-29 at 10 34 24 AM Screenshot 2026-05-29 at 10 34 10 AM

Config surface

Existing TOMLs in configs/ and examples/ keep parsing without changes or warnings.

Additive in packages/prime-rl-configs/.../orchestrator.py:

  • pre_batch_filters: list[FilterConfig] (default: gibberish, repetition, zero-advantage all in monitor mode).
  • post_batch_filters: list[FilterConfig] (default unchanged from the prior filters: gibberish + repetition monitor, zero-advantage enforce). A @model_validator(mode="before") silently renames TOML/CLI filterspost_batch_filters; runtime code only references post_batch_filters.
  • eval.skip_first_step: bool = False — when False (default) the orchestrator runs an eval epoch at step 0 before any train rollouts dispatch. Replaces the old eval_base_model (semantics inverted: False now means run startup eval).
  • LogConfig.interval: float = 10.0 (under [log]) — the period for the unified pipeline PeriodicLogger.
  • TrainEnvConfig.group_size: int — per-env override; a validator propagates the top-level group_size into each train env that doesn't override it.

Removed / renamed fields are enumerated under Breaking below.

Breaking

The Python module surface inside prime_rl.orchestrator changes (the orchestrator class, RolloutDispatcher, sinks, sources, etc. move to new files); external users importing those internals must update imports. The rl / orchestrator / sft CLIs are unchanged.

Removed config fields — a config that sets these now fails to parse (extra_forbidden):

  • orchestrator.seed — removed (was only consumed by the deleted buffer; no replacement).
  • orchestrator.eval.eval_base_model — replaced by orchestrator.eval.skip_first_step, with inverted semantics: eval_base_model = trueskip_first_step = false (the default, i.e. run the step-0 eval).
  • orchestrator.eval.skip_eval_on_resume (and its alias skip_eval_on_restart) — folded into skip_first_step; resume no longer re-fires an already-completed eval (deduped via last_eval_step_by_env).
  • orchestrator.eval.cancel_inflight_rollouts_on_eval — removed; the drain-switch overlap is the only eval transition mode.
  • ckpt.skip_buffer — removed (there is no buffer to skip).

Removed config fields — dropped with a FutureWarning (still parse):

  • [orchestrator.buffer] and every key inside it (seed, easy_threshold, hard_threshold, easy_fraction, hard_fraction, online_difficulty_filtering, hash_keys). Difficulty pools are gone. A @model_validator(mode="before") drops the block and warns; when online_difficulty_filtering = true was set, migrate to an enforced zero_advantage pre-batch filter:

    [[orchestrator.pre_batch_filters]]
    type = "zero_advantage"
    enforce = true

Renamed (backward compatible — old key still works via alias):

  • orchestrator.filtersorchestrator.post_batch_filters. A @model_validator(mode="before") aliases filters, so existing TOML/CLI keep parsing; runtime code only references post_batch_filters.

Behavior changes (field unchanged, semantics changed):

  • orchestrator.max_off_policy_steps now applies to eval rollouts as well as train (eval rollouts past the cap are cancelled).
  • Filters apply to train rollouts only — eval rollouts are no longer filtered.

Note

High Risk
Large architectural rewrite of orchestrator scheduling, checkpointing, and config surface (removed buffer/eval flags, new off-policy behavior for eval); incorrect migration or edge cases could affect training correctness and resume behavior.

Overview
Replaces the step-driven orchestrator with an async pipeline where train and eval rollouts share one max_inflight_rollouts budget and rate limiter. A new RolloutDispatcher schedules work in PREFER_TRAIN / PREFER_EVAL modes so eval can start while in-flight train work drains (no blocking eval-only mode). TrainSource / EvalSource feed examples; TrainSink / EvalSink finalize batches by counting arrivals per group_id (tokenization and advantages per rollout/group, not at ship time).

Config: drops [orchestrator.buffer] and difficulty-pool checkpointing; orchestrator.filters aliases to post_batch_filters; adds pre_batch_filters (train-only filtering). Removes orchestrator.seed, eval eval_base_modelskip_first_step (inverted), skip_eval_on_resume, cancel_inflight_rollouts_on_eval, ckpt.skip_buffer. max_off_policy_steps now cancels stale eval rollouts too. TrainEnvConfig.group_size per-env override with propagation from top-level group_size. LogConfig.interval for unified periodic pipeline logging.

Code: deletes buffer.py (~320 lines); checkpoints only progress.pt; assign_advantages is per-group; filters operate on TrainRollout wrappers. New modules: dispatcher.py, eval_source.py, eval_sink.py, metrics.py. Entrypoint calls run_orchestrator.

Configs: strip [orchestrator.buffer] from math/SWE examples; migrate online_difficulty_filtering to enforced pre_batch_filters where needed; add debug multi_env, multimodal, rlm_swe/qwen35_4b; consolidate/remove some multimodal and multi-reverse-text TOMLs. Registers rlm-swe env in pyproject.toml. Documents breaking changes in CHANGELOG.md.

Reviewed by Cursor Bugbot for commit 234adc2. Bugbot is set up for automated code reviews on this repo. Configure here.

Opt-in via `orchestrator.experimental.use_orch_v2 = true`. Replaces the
step-driven outer loop of `prime_rl.orchestrator` with an async pipeline
backed by a single dispatcher that schedules both train and eval rollouts
through a shared concurrency limiter and rate limiter.

The dispatcher uses a level-triggered `SchedMode.PREFER_TRAIN | PREFER_EVAL`:
on each eval-trigger boundary it stops scheduling new train rollouts but
lets the in-flight tail drain naturally, then resumes train as soon as the
eval queue empties. Both transitions overlap on the same semaphore, so the
pool stays saturated and we recover the cost of the current "blocking eval"
behavior without cancelling work.

Module layout (src/prime_rl/orchestrator_v2/):
- `policy.py` — shared mutable `Policy(version, model_name)`
- `watcher.py` — polls broadcasts, calls `update_weights`, notifies observers
- `dispatcher.py` — shared semaphore + AsyncLimiter, PREFER_TRAIN/PREFER_EVAL,
  off-policy cancellation, partial-group handling, per-env eval triggers
- `batcher.py` — pre_batch_filters → compute_advantages (in dispatcher) →
  batch buffer → post_batch_filters → tokenize → ship; eval aggregator with
  per-`eval_step` flush
- `log_loop.py` — `IntervalLogger` task writes dispatcher gauges +
  event-loop lag to wandb on the `_timestamp` axis (async-native, decoupled
  from step semantics)
- `ckpt.py` — lean `Progress(step, last_eval_step)` checkpoint manager
- `orchestrator.py` — wires everything in `asyncio.TaskGroup`, bounded
  graceful shutdown

Config evolution (additive, non-breaking):
- `pre_batch_filters` and `post_batch_filters` fields added.
- `filters` is silently aliased to `post_batch_filters` via
  `AliasChoices` — no deprecation warning, all existing TOMLs keep parsing.
- Default pre slot registers all three filter types in monitor mode;
  default post slot is unchanged from today's `filters` default
  (zero-advantage enforced, gibberish/repetition in monitor mode). Net
  out-of-the-box behavior matches the legacy orchestrator.
- `OrchestratorExperimentalConfig.use_orch_v2: bool = False` flips the
  `rl` entrypoint to launch `orchestrator-v2` instead of `orchestrator`.
- `OrchestratorExperimentalConfig.log_loop_interval: float = 5.0` controls
  the gauge cadence.
- `BufferConfig` and `eval.cancel_inflight_rollouts_on_eval` are still
  parsed (the legacy orchestrator still uses them) but v2 simply doesn't
  consume them. Difficulty pools are replaced by `pre_batch_filters`;
  eval transitions in v2 always use the drain-switch overlap.

Entrypoint wiring:
- New `orchestrator-v2` console script in `pyproject.toml`.
- `rl` launcher picks between `orchestrator` and `orchestrator-v2` based
  on the flag.
- New `configs/debug/orch_v2.toml` overlay flips the flag for parity
  testing against the same TOML.

Refs #2311.

Co-authored-by: Cursor <cursoragent@cursor.com>
@mikasenghaas mikasenghaas changed the title feat(orchestrator): orch v2 — shared train+eval scheduler with drain-switch overlap feat(orchestrator): orch v2 May 26, 2026
mikasenghaas and others added 13 commits May 26, 2026 19:45
- Make `Orchestrator` a proper class with `__init__` / `setup` / `start` /
  `stop` / `shutdown` methods. No more module-level `orchestrate` coroutine —
  the new `run_orchestrator` is a thin `@clean_exit`-wrapped helper that
  instantiates the class and awaits `start()`.
- Replace the `Done` exception with a shared `orch.stopped: asyncio.Event`.
  The batcher sets it when `max_steps` is reached; `Orchestrator.start()`
  blocks on it and then drives the shutdown sequence. No more
  exception-driven control flow / `except* Done` group-unwinding.
- Rename `run` → `start` across all async components (`RolloutDispatcher`,
  `TrainBatcher`, `WeightWatcher`, `IntervalLogger`).
- Slim `TrainBatcher.__init__` to a single `orch: Orchestrator` arg
  (previously 16 wiring args). The batcher reads shared state via
  `@property` accessors on `self.orch` — no behavior change, just less
  ceremony at construction time.
- Drop all leading-underscore "private" methods and attributes on the v2
  classes — we're not designing an API. Notably, `_handle_train`,
  `_ship_batch`, `_wait_barrier`, `_fill_inflight`, `_try_schedule_*`,
  `_schedule_group_rollout`, `_handle_completed_rollout`,
  `_maybe_finalize_group`, `_select_least_loaded_client`, `_acquire`,
  `_release`, `_drop_group`, `_apply_policy_update`, `_compute_next_ckpt_step`,
  etc. all lose the underscore. Internal state (`_stopped`, `_task`, etc.)
  too.
- Remove `MAX_EMPTY_BATCH_ATTEMPTS`. If a batch has zero trainable signal,
  the batcher logs a warning and just retries on the next batch instead
  of crashing after N consecutive empties.
- Remove `drain_pending_eval` from `TrainBatcher` and the matching
  `force_eval` from `RolloutDispatcher` — not a behavior we want to
  support. Final evals at end-of-run still run via the legacy
  `EvalEnv.evaluate` path (now called directly from `Orchestrator.start()`).
- Soften the barrier stall log: drop the speculative "Trainer may be
  stuck." sentence. Most of the time inference is just faster than the
  trainer, so the message was misleading. Bumped the first-warn threshold
  from 30s → 60s while we're there.
- Revert `uv.lock` to `main` (the previous commit picked up a stray
  `wikispeedia` workspace member from `uv sync --all-extras` that wasn't
  part of this PR).

Co-authored-by: Cursor <cursoragent@cursor.com>
…onents

Splits the previous catch-all ``TrainBatcher`` into four single-purpose
classes, each with explicit data-flow dependencies declared at construction
time. No more parent ``orch`` back-pointer / property-shim layer.

New module layout:

- ``batcher.py`` — ``Batcher``: pure in-memory accumulator with
  ``add / ready / pop / buffered_count``. Owns the pre-batch filters and the
  per-batch pre-filter counters. No I/O, no async, no step tracking.
- ``postprocessor.py`` — ``PostProcessor``: takes a popped batch + step
  number, runs post-batch filters, tokenizes, builds ``TrainingSample``\\ s,
  computes teacher logprobs (opd), and ships via the sender. Returns a
  ``ProcessResult`` carrying the per-rollout stats the metrics builder
  needs — so this class doesn't import pandas.
- ``eval_collector.py`` — ``EvalCollector``: buckets eval ``Trajectory``\\ s
  by ``eval_step``; flushes per-env reward / completion-len / pass@k metrics
  when the dispatcher's expected count comes back. Takes ``(expected,
  fired_envs)`` as call args instead of reaching into the dispatcher.
- ``metrics.py`` — ``MetricsBuilder``: assembles the per-step W&B dict from
  the rollout cohort + ``ProcessResult`` + dispatcher gauges. Pure-ish; only
  state is the config.

``Orchestrator`` now owns the pipeline driver loop:

- ``main_loop`` pulls trajectories off the dispatcher queue, routes train vs
  eval, drives ``batcher.add → batcher.pop`` and ``postprocessor.process``,
  builds metrics, logs to the monitor, and increments ``progress.step``.
- ``process_one_step`` is the single shipping step (ckpt save → max_steps
  check → pop → ship → barrier wait → metrics + log → step++).
- ``wait_barrier`` and ``maybe_save_ckpt`` are public helper methods on the
  orchestrator (no leading underscores; the v2 classes deliberately don't
  hide internals).

Dependency surface, post-refactor — every arrow is a real data-flow dep
declared at the call site, not a "shared context" smell:

  Batcher        ← config (batch_size / token_batch_size), pre_filters
  PostProcessor  ← config, tokenizer, renderer, mm_token_type_ids_mapping,
                   sender, teacher_inference, post_filters
  EvalCollector  ← config, monitor, eval_envs, post_filters
  MetricsBuilder ← config
  IntervalLogger ← dispatcher, policy, interval  (no more batcher refs)
  Orchestrator   ← owns all of the above

Also:

- Rename ``TrainBatcher`` → ``Batcher``: ``EvalCollector`` is the eval-side
  abstraction now, so the prefix is misleading.
- Drop the parent-pointer ``orch: Orchestrator`` constructor argument and
  the ``@property`` shim layer it forced (``dispatcher``, ``policy``,
  ``progress``, ``monitor`` etc. all read off ``self.orch`` previously).
- ``IntervalLogger`` no longer reads ``batcher.last_*`` cached state — those
  fields are gone. Step-aligned values plot fine via the per-step monitor
  log; the time-axis emit just covers dispatcher gauges + event-loop lag.
- Remove ``wikispeedia`` from ``pyproject.toml`` (stray pickup from a local
  ``uv sync --all-extras`` during dev).

Smoke: 8-step reverse-text run under v2 with the new layout, reward
0.11 → 0.24, clean event-based shutdown via ``self.stopped.set()`` at
``max_steps``.

Co-authored-by: Cursor <cursoragent@cursor.com>
…ush signals

Redesigns the data flow to be signal-based. The atomic unit emitted by the
dispatcher is now a single Rollout (one completed rollout, never a group).
Grouping moves into the sinks. Each sink owns its own flush signal:

- TrainSink groups rollouts by (env_name, example_id). When
  Rollout.is_group_complete=True arrives, the GRPO group is finalized:
  compute_advantages over the survivors, then apply_filters for the
  pre-batch pass, then drop filtered rollouts and extend batch_buf. The
  flush signal is batch_ready() — the orchestrator polls it after every
  add() and calls pop() to ship a batch.
- EvalSink groups rollouts by (env_name, eval_step). When
  Rollout.is_group_complete=True arrives, the env's eval epoch is done.
  add() returns an EvalFlush payload that the orchestrator logs.
  Filters do not apply to eval — filters are train-only by design.

Dispatcher changes:

- Trajectory (group of rollouts) replaced by Rollout (single rollout).
  maybe_finalize_group / emit_group push individual rollouts to out_q
  instead of one Trajectory per group.
- Train: the last surviving rollout of each (env, example_id) group gets
  is_group_complete=True.
- Eval: tracked per (env_name, eval_step) via eval_expected_per_env /
  eval_emitted_per_env / eval_dropped_per_env. The last expected rollout
  (emitted + dropped == expected) gets is_group_complete=True, flagging
  the env's epoch as done.
- compute_advantages is no longer called here — that's a train-sink
  concern now.
- Drops the cross-env expected_eval_counts dict and replaces it with
  per-env tracking.

Orchestrator changes:

- main_loop consumes Rollout and routes by kind. Train rollouts go to
  train_sink.add(); batch_ready() triggers process_one_step. Eval
  rollouts go to eval_sink.add(); an EvalFlush return triggers
  flush_eval (per-env metric logging on the orchestrator since it needs
  self.monitor and the logic is small).
- flush_eval is a new public method that builds per-env reward / seq-len
  / pass@k metrics from an EvalFlush and logs them via the monitor.
- post_filters is now a member field used by PostProcessor only; the
  eval path doesn't read it.

Removed:

- orchestrator_v2/batcher.py (replaced by train_sink.py)
- orchestrator_v2/eval_collector.py (replaced by eval_sink.py)

Smoke (eval.interval=5, max_steps=12, num_examples=16):

- Step 0 eval (eval_base_model) flushes cleanly: Reward 0.17.
- Step 5 eval triggers with inflight_train=121 — drain-switch overlap
  intact. Eval flushes per-env (Reward 0.54) while train continues
  (steps 6-8 ship during eval drain).
- Reward 0.14 → 0.60 over 12 steps; clean event-driven shutdown via
  self.stopped.set() at max_steps.
Both TrainSink and EvalSink now have a uniform three-level processing
structure. The dispatcher emits Rollout (atomic unit) with two boundary
signals; sinks dispatch each level to a named process function whose
logic naturally lives at that scope:

  Level    When                    Method                What it does
  -------- ----------------------- --------------------- -----------------------------
  rollout  every add()             process_rollout       train: backfill + interleave (eager tokenization)
                                                         eval:  no-op
  group    is_group_done=True      process_group         train: compute_advantages + pre_batch_filters
                                                         eval:  move per-example group into env batch bucket
  batch    batch_ready (train)     process_batch         train: post_batch_filters + assemble samples + metrics
           is_batch_done (eval)                          eval:  build per-env metrics (reward, pass@k, ...)

Dispatcher Rollout dataclass:

- is_group_complete split into two boundary signals:
  - is_group_done: last rollout of the (env, example_id) group. Set for
    both train (GRPO group) and eval (per-example group).
  - is_batch_done: last rollout of an eval env-epoch (emitted+dropped ==
    expected). Always False for train — the train sink decides batch
    boundaries itself from batch_size / token_batch_size.
- emit_group sets both flags appropriately per kind.

TrainSink:

- async add(rollout) returns True when the batch is ready to pop.
- process_rollout is the eager-tokenize hook. Per-rollout tokenization
  overlaps with the dispatcher producing more rollouts, removing the
  parallel-tokenize-all-at-ship-time sync point.
- process_group runs compute_advantages, propagates advantage / reward /
  env_name onto the already-tokenized TrainingSamples, then runs the
  pre-filter pass.
- pop_batch(step, progress, gauges, ...) runs process_batch internally
  and returns a TrainBatchResult { rollouts, samples, metrics, result }.
- Owns the MetricsBuilder reference; per-batch metrics are part of
  process_batch, not a separate orchestrator step.

EvalSink:

- add(rollout) returns an EvalBatchResult when an env-epoch is complete
  (is_batch_done arrived).
- process_rollout is a no-op (kept for structural symmetry with
  TrainSink).
- process_group buckets the per-example group into the (env, eval_step)
  batch.
- process_batch / build_metrics produces the eval/{env}/ metrics dict
  (reward, pass@k, etc) — no filter calls; filters are train-only.

Orchestrator:

- main_loop is a thin router: await train_sink.add(...) — if True, run
  process_one_step; eval_sink.add(...) — if EvalBatchResult, run
  log_eval_batch.
- process_one_step is now I/O-only: save_rollouts, offload_images,
  teacher_logprobs (opd), wait_barrier, sender.send, monitor.log,
  heartbeat, ckpt save, step++. All processing already happened in the
  sink.
- log_eval_batch is a small side-effect method: save_rollouts +
  monitor.log_eval_samples + monitor.log; metrics dict comes pre-built
  from EvalSink.
- postprocessor.py is gone — its work split across TrainSink (filter +
  tokenize + samples + metrics) and orchestrator (ship I/O + opd
  logprobs + disk side effects).
- ProcessResult moved into metrics.py to avoid a circular import.

Smoke (eval.interval=5, max_steps=12, num_examples=16):

- Step 0 eval flushes cleanly: Reward 0.14.
- Step 5 eval triggers with inflight_train=115 — drain-switch overlap
  intact. Train steps 6/7/8 ship during eval drain. Eval flushes
  per-env: Reward 0.48.
- Reward 0.13 → 0.64 over 12 steps; clean event-driven shutdown.
…nd eval

Both sinks now use the same flag — ``Rollout.is_batch_done`` — to signal the
batch boundary. The orchestrator's main loop polls a single, uniform
condition; the only difference is who stamps the flag:

- Eval: the dispatcher stamps ``is_batch_done=True`` on the last expected
  rollout of an env-epoch (emitted+dropped == expected). No change here.
- Train: ``TrainSink.add`` now stamps ``rollout.is_batch_done = True`` after
  ``process_group`` if its buffer has reached ``batch_size`` / token
  budget. The polling counterpart is ``TrainSink.is_batch_done()`` (renamed
  from ``batch_ready()`` for naming symmetry with the rollout flag).

Orchestrator main loop:

  await self.train_sink.add(rollout)
  while rollout.is_batch_done and not self.stopped.is_set():
      await self.process_one_step()
      rollout.is_batch_done = self.train_sink.is_batch_done()

Same predicate (``rollout.is_batch_done``) drives both sinks. The eval
path consumes the flag inside ``eval_sink.add``; the train path consumes
it in the main loop's ``while``.

Smoke: reverse-text 8-step run under v2, reward 0.13 → 0.36, clean
event-driven shutdown via ``self.stopped.set()`` at max_steps.
…wcase

- Drop ``configs/debug/orch_v2.toml``. v2 is opt-in via a CLI flag —
  ``--orchestrator.experimental.use-orch-v2`` (or the equivalent TOML
  key) — and a one-liner overlay TOML was overkill for that.
- Add ``configs/hendrycks_math/orch_v2_demo.toml`` — a small (2 GPU,
  30 train step) eval-heavy run that makes the v2 overlap win visible
  on wall clock. 6 AIME2024 eval epochs of 30 × 4 = 120 long-reasoning
  rollouts each; under the legacy orchestrator each epoch pauses train,
  under v2 they overlap. Comment block at the top of the file shows
  how to run both side-by-side via the CLI flag.
The eval boundary is a strict function of ``progress.step``, ``eval.interval``,
``eval_base_model``, and ``skip_eval_on_resume`` — no extra persistent state
needed. ``progress.last_eval_step`` was only ever written (in
``log_eval_batch``) and never read; the dispatcher's own
``last_eval_step_per_env`` dict handles "don't re-trigger at the resumed
step" and is seeded from ``resume_step`` directly.

The ``Progress`` dataclass on the ckpt now carries just ``step`` + totals.
Old checkpoints containing the extra ``last_eval_step`` field load fine
— the ``hasattr(progress, key)`` guard in ``CheckpointManager.load`` skips
unknown serialized fields silently.
…ypes

Rename `EvalBatchResult` → `EvalBatch`, `TrainBatchResult` → `TrainBatch`,
and rename `eval_step` → `step` on the eval payload. The dataclasses now
carry only the raw rollouts + per-rollout counters; the `metrics` dict is
a pure view computed at log time.

Why: the `metrics` field duplicated `rollouts` + a few scalars and forced
the orchestrator to build with a `step_time=0.0` placeholder inside
`TrainSink.pop_batch` and mutate the dict after the post-barrier wait.
Building at log time with the real timings keeps one source of truth.

- Drop `metrics` from both batch payloads; orchestrator calls
  `MetricsBuilder.build` (train) and `EvalSink.build_metrics` (eval) when
  ready to log.
- Drop `parallel_preprocess_time` / `teacher_logprobs_time` from
  `ProcessResult` (never read in v2 — tokenization is per-rollout;
  teacher logprobs is an orchestrator scalar passed to `build` directly).
- `TrainSink` no longer holds a `MetricsBuilder` ref; single-purpose
  ingest + payload assembly.

Co-authored-by: Cursor <cursoragent@cursor.com>
Move every dataclass / type alias / protocol from the orchestrator_v2
modules into a single ``types.py``. Behavioral modules (dispatcher, sinks,
watcher, log_loop, ckpt, metrics, orchestrator) now import their payloads
from one place instead of from each other.

Moved:
- ``Policy`` (was policy.py — file deleted, only contained the dataclass)
- ``Progress`` (was ckpt.py)
- ``Kind``, ``SchedMode``, ``Rollout``, ``RolloutMeta``, ``GroupState``
  (were dispatcher.py)
- ``EvalBatch`` (was eval_sink.py)
- ``TrainBatch`` (was train_sink.py)
- ``ProcessResult`` (was metrics.py)
- ``VersionObserver`` (was watcher.py)

Co-authored-by: Cursor <cursoragent@cursor.com>
Let train/eval rollouts use the full 8k max_model_len so the demo
showcases the overlap on realistic long-reasoning AIME completions
instead of capping them at 4k.

Co-authored-by: Cursor <cursoragent@cursor.com>
… sink

Previously the dispatcher stamped ``is_group_done`` / ``is_batch_done``
flags on whichever rollout it happened to iterate last in ``emit_group``.
But "last in a group" is whichever straggler finishes last — not knowable
at dispatch time — so the flag-on-rollout model was always derivative of
arrival ordering rather than a real signal.

Flip the design so the dispatcher emits every dispatched rollout (success,
env-error, empty-trajectory, task-exception, off-policy-cancel) exactly
once to ``out_q`` and the sinks own group/batch finalization by counting:

- Drop ``is_group_done`` / ``is_batch_done`` from ``Rollout``.
- ``TrainSink.add`` finalizes a group on the ``group_size``-th arrival,
  returns ``bool`` for batch-ready (was: mutate ``rollout.is_batch_done``).
- ``EvalSink.add`` finalizes a group on the ``group_size``-th arrival,
  finalizes the epoch when total arrivals reach
  ``num_examples * group_size``, returns ``EvalBatch | None``.
- Sinks decide drop / partial-train policy: filter errored rollouts before
  ``compute_advantages``; drop the whole group when the env
  ``requires_group_scoring`` and any rollout failed.
- Dispatcher synthesizes ``vf.RolloutOutput`` markers with ``error`` set
  for task exceptions and off-policy cancellations, so the sink's count-
  based finalization still fires when rollouts never produced real output.
- Drop ``eval_expected_per_env`` / ``eval_emitted_per_env`` /
  ``eval_dropped_per_env`` side-channel bookkeeping — pure dead weight
  once the sink counts arrivals itself.
- Drop ``GroupState.completed_rollouts`` / ``failed_rollouts`` — the
  dispatcher no longer accumulates rollouts before emit; ``emitted``
  counter is enough to pop the group from ``self.groups`` once every
  member has flowed out.

Co-authored-by: Cursor <cursoragent@cursor.com>
… check

The single-node SLURM launcher dumps the fully-resolved ``RLConfig`` to
disk and re-loads it inside the SLURM job. The dumped TOML has both the
shared-level keys (``output_dir``, ``model.name``, ``wandb.*``, ...) and
the matching sub-config keys (filled by the propagator on the original
load) populated to the same value. The old conflict check flagged
``shared is set, sub is also set`` regardless of values, so the re-load
failed with a ``Shared config conflicts with matching sub-config field(s)``
error.

Tighten the check to "shared and sub disagree" — matching values are a
harmless roundtrip (the propagator's fill-if-absent would have produced
the same thing). Genuine conflicts (e.g. shared ``model.name = "A"`` vs
``trainer.model.name = "B"``) still raise.

Covers the three sites that build ``conflicts``: the ``propagate``
helper, the ``output_dir`` block (matches against the per-target expected
value, including ``orchestrator.output_dir = f"{output_dir}/run_default"``),
and the ``wandb.name`` block (matches against the ``-trainer`` /
``-orchestrator`` suffixed form in non-shared mode).

Co-authored-by: Cursor <cursoragent@cursor.com>
…ation

Promote the v2 design to be the orchestrator. Clean deprecation — no
``use_orch_v2`` flag, no parallel ``orchestrator-v2`` console script, no
``prime_rl.orchestrator_v2`` module. The v2 sources move into
``prime_rl.orchestrator`` and the legacy files (``orchestrator.py``,
``scheduler.py``, ``buffer.py``, ``ckpt.py``) are deleted outright.

The legacy ``orchestrator`` console script keeps working — it now points
at the new ``Orchestrator(config).start()`` class. Existing TOML configs
work as-is (the public surface didn't move).

To compare against pre-v2 numbers, check out ``main`` and re-run any of
the existing configs head-to-head.

Co-authored-by: Cursor <cursoragent@cursor.com>
@mikasenghaas mikasenghaas changed the title feat(orchestrator): orch v2 feat(orchestrator): redesign as async-pipelined signal-based orchestrator May 27, 2026
mikasenghaas and others added 14 commits May 27, 2026 02:40
…lean dispatcher inference dep

Three small cleanups that came out of post-merge review of the
orchestrator rewrite:

1. **UUIDs for dispatcher group IDs.** ``RolloutMeta.group_id`` and
   ``self.groups`` are now keyed by ``uuid.UUID`` (drop the ``self.next_group_id``
   counter + ``new_group_id`` helper). One less piece of state to reason about
   across dispatcher restarts / resume paths and zero risk of stale-counter
   collision.

2. **Client selection moves onto ``InferencePool``.** ``client_identity`` is a
   module-level helper next to the pool definitions in ``utils/client.py`` and
   ``InferencePool.select_train_client(load)`` is the public hook (implemented
   on both ``StaticInferencePool`` and ``ElasticInferencePool``). The dispatcher
   builds the load ``Counter`` from its own in-flight tracking and calls the
   pool — load tracking stays caller-side, but "wait for clients + pick least
   loaded" is now pool-side where the clients actually live.

3. **Dispatcher takes ``inference: InferencePool`` directly.** The
   ``training_mode == "sft"`` resolution (``rollout_inference = teacher_inference
   if sft else student_inference``) moves up into ``Orchestrator.setup`` where
   both pools are already in scope; the dispatcher only knows about the one
   pool it actually uses.

Also drops a few stray "v2" mentions in log strings / proc title / docstrings
that survived the previous cleanup pass.

Co-authored-by: Cursor <cursoragent@cursor.com>
Roll back the propagator conflict-check loosening from this branch. The
single-node SLURM dump-then-reload bug it was working around is a
pre-existing issue on the launcher path; will be addressed in a separate
PR rather than ride along with this orchestrator rewrite.

Local launcher (the default ``rl_local`` path) is unaffected by the
validator either way.

Co-authored-by: Cursor <cursoragent@cursor.com>
…cher

Mirror the ``Train*`` / ``Eval*`` naming used everywhere else in the
orchestrator (``TrainEnvs`` / ``EvalEnvs``, ``TrainSink`` / ``EvalSink``,
``TrainBatch`` / ``EvalBatch``). Both abstractions answer the same
question — "what's the next example to schedule?" — with the asymmetry
their roles demand:

- ``TrainSource`` (was inline ``TrainEnvCycle``) — infinite pull. Same
  weighted round-robin over training envs as before; just renamed and
  moved into its own module.

- ``EvalSource`` (new) — trigger-driven finite-per-epoch pull. Owns the
  per-env example lists, per-env intervals, the per-(env, step) FIFO
  queue, ``last_eval_step_per_env``, ``eval_base_model`` startup
  handling, and ``skip_eval_on_resume`` gating. Exposes
  ``trigger(step)`` / ``trigger_at_start()`` for the dispatcher to poke,
  and ``peek`` / ``pop`` / ``bool(source)`` / ``len(source)`` for it to
  pull.

Dispatcher net loss: ~150 lines and 5 fields (``eval_queue``,
``eval_examples``, ``eval_intervals``, ``eval_step_envs``,
``eval_at_zero_pending``, ``last_eval_step_per_env``, ``resume_step``)
plus two methods (``fire_eval_epoch``, ``enqueue_eval_env``).
``RolloutDispatcher.on_new_version`` is now a 3-line delegation:
``fired = eval_source.trigger(step) ; if fired: switch_mode + log``.

Also drops ``eval_step_envs`` entirely — pure dead state: only ever
written, never read.

Co-authored-by: Cursor <cursoragent@cursor.com>
…tch, add eval cancellation

A bundle of orchestrator/dispatcher cleanups all sitting in this PR:

**dispatcher → just a scheduler**
- ``__init__`` takes explicit params (``train_source``, ``eval_source``,
  ``inference``, ``policy``, ``max_inflight_rollouts``,
  ``tasks_per_minute``, ``group_size``, ``max_off_policy_steps_train/eval``,
  ``training_mode``) instead of the full ``OrchestratorConfig``. Sources
  are now orchestrator-owned and passed in.
- Drops ``resume_step`` — it's purely an ``EvalSource`` concern now and
  the orchestrator passes it directly to ``EvalSource``.
- ``DispatcherMetrics`` dataclass on ``types.py`` consolidates all the
  per-poll counters (cancellation by kind, error/empty by env,
  errors-by-type, total arrivals, eval_epochs_started, mode_transitions)
  with ``record_*`` helpers and ``drained()`` that returns the wandb
  shape + clears the drain counters in one shot. Replaces 6 dict/int
  fields previously scattered on the dispatcher.
- Drops the redundant ``inflight_total_permits`` property (was just an
  alias for ``inflight_permits``).
- Unifies ``try_schedule_train`` + ``try_schedule_eval`` into a single
  ``try_schedule(kind)`` + ``next_fresh_group(kind, envs)``. Same
  "continue existing group → otherwise open a fresh one" algorithm for
  both; the only asymmetry (train pulls from an infinite source vs. eval
  peeks+pops a finite queue) sits in two clearly-labeled branches of
  ``next_fresh_group``.

**eval triggers move to per-batch (orchestrator-side)**
- ``Orchestrator.process_one_step`` calls ``maybe_trigger_eval(step)``
  at the end (after ``progress.step += 1``); ``Orchestrator.start``
  calls ``maybe_trigger_eval(at_start=True)`` once before the main loop.
- ``RolloutDispatcher.on_new_version`` now does only the off-policy
  bookkeeping (bump counters, cancel stale groups). Eval epoch
  triggering is no longer tied to "we just got new weights"; it's tied
  to "we just completed a training step", which matches the conceptual
  model.

**eval cancellation + cancelled-as-errored**
- ``max_off_policy_steps_eval: int | None = None`` (default uncapped —
  preserves current behavior) on ``OrchestratorConfig``. When set, eval
  rollouts older than the cap get cancelled via the same drain-and-
  switch path as train (synthetic ``Cancelled`` rollouts flow to the
  eval sink). Train and eval caps can now diverge.
- ``EvalSink.build_metrics`` filters errored rollouts out of all stats
  (reward/seq_len/pass@k) and reports them as ``cancelled_count`` /
  ``errored_count`` / ``valid_rate`` separately. Counting failed
  attempts as reward=0 would silently bias scores down.

**``eval.eval_base_model`` renamed to ``eval.skip_first_step``**
- New default is False (no step-0 eval — opt in instead of opt out).
  The legacy ``eval_base_model`` key still parses via a silent
  ``AliasChoices`` (same boolean semantics), so configs that explicitly
  set ``eval_base_model=true|false`` keep working. Configs that relied
  on the implicit default get the new no-step-0-eval behavior.

**misc**
- Drop ``OrchestratorConfig.seed`` (hardcoded ``seed=42`` for
  ``TrainSource`` in the orchestrator — was effectively the default
  anyway).
- Drop ``eval_step_envs`` dead state (write-only).
- Drop ``record_dropped_group`` from ``DispatcherMetrics`` — group
  dropping is a sink concern now (see prior commit).

Co-authored-by: Cursor <cursoragent@cursor.com>
…off_policy_steps

Drop the global ``OrchestratorConfig.max_off_policy_steps_eval`` field and
instead make off-policy caps a per-side override, matching the same
shared-default / per-side-override pattern used everywhere else in the
config (e.g. ``train.sampling`` vs per-env sampling):

- ``OrchestratorConfig.max_off_policy_steps`` (already existed, default
  8): the shared default for both train and eval.
- ``TrainConfig.max_off_policy_steps: int | None = None``: per-train
  override; ``None`` inherits the global.
- ``EvalConfig.max_off_policy_steps: int | None = None``: per-eval
  override; ``None`` inherits the global.

The orchestrator resolves both sides to plain ints before passing to the
dispatcher, so the dispatcher's signature tightens from ``int | None`` to
``int`` for ``max_off_policy_steps_eval`` (caps are never "unset" by the
time they reach the dispatcher — they either come from the per-side field
or fall back to the global default).

Behavior change worth flagging: the previous ``max_off_policy_steps_eval
| None = None`` defaulted to *uncapped* (eval was never cancelled). The
new pattern defaults eval to the same cap as train (``8`` by default),
so eval rollouts that lag past the cap will now be cancelled by default.
Set ``eval.max_off_policy_steps`` to a large value if the previous
uncapped behavior is preferred.
…_for→batch_size_for

Two follow-ups from review that didn't ride along the previous commit:

- ``EvalSink.process_batch`` now builds the per-env metrics dict inline
  and stamps it on the returned ``EvalBatch.metrics`` field. Was an
  awkward two-step (``process_batch`` returned just rollouts, then the
  orchestrator separately called ``build_metrics(batch)``); per-batch
  derivations belong in the per-batch hook. ``build_metrics`` as a
  standalone method goes away.

- ``EvalSource.expected_for`` → ``batch_size_for`` (it's the same number
  the train sink calls ``batch_size`` — total rollouts per epoch).
…cy_steps

Roll back ``TrainConfig.max_off_policy_steps`` / ``EvalConfig.max_off_policy_steps``.
Keep just the global ``OrchestratorConfig.max_off_policy_steps``; it applies
to train and eval rollouts the same way. Eval cancellation behavior
(introduced earlier in this branch — cancelled rollouts surface as
``Cancelled`` markers, sink filters them out of stats) stays in place;
this commit only reverts the per-side override scaffolding.
# Conflicts:
#	src/prime_rl/orchestrator/orchestrator.py
The ``SharedWandbConfig.deprecate_shared_field`` ``model_validator(mode="before")``
introduced in #2649 popped ``wandb.shared`` from input dicts with a
``FutureWarning`` so older configs that still set ``shared = true`` (or
``false``) kept loading.

The ``rl`` entrypoint always uses shared W&B mode now, the legacy split with
``-trainer`` / ``-orchestrator`` suffixes is gone, and the in-tree configs no
longer set ``shared``. Drop the shim so configs that still carry the field
fail loudly via the default ``extra = "forbid"`` instead of silently warning.

Docs and CHANGELOG updated.

Co-authored-by: Cursor <cursoragent@cursor.com>
pydantic-config's CLI negation (``--no-wandb`` / ``--no-ckpt``) writes the
string ``"None"`` into the override dict, which only becomes real ``None``
later via ``BaseConfig._none_str_to_none``. ``RLConfig``'s subclass
``mode="before"`` validators run *before* that conversion, so the bare-block
enablement in ``propagate_shared_fields`` used to see the string and
re-enable ``trainer.wandb`` / ``orchestrator.wandb`` (or the ``ckpt`` pair)
with defaults — silently undoing the CLI's intent.

Tighten the bare-block check from ``get(key) is not None`` to
``isinstance(get(key), dict)``. A bare ``[wandb]`` block in TOML is ``{}``
(dict, still enables); ``--no-wandb`` produces ``"None"`` (string, now
correctly skipped). Same logic applies to ``ckpt``.

Verified end-to-end with ``cli(RLConfig, args=[..., "--no-wandb"])``:
``cfg.wandb``, ``cfg.trainer.wandb``, ``cfg.orchestrator.wandb`` are all
``None``. ``--wandb None`` and ``--no-ckpt`` behave the same. Bare
``[wandb] {}`` / ``[ckpt] {}`` blocks still enable the sub-configs.

Co-authored-by: Cursor <cursoragent@cursor.com>
It's a bug fix, not a breaking config change, so it doesn't belong in
CHANGELOG.md (which documents breaking config changes only).

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
``docs/`` should describe the current state of the codebase, not its
history. Strip the "the legacy split has been removed" sentence from
``docs/training.md`` — readers on ``main`` don't care that there used to
be a non-shared mode — and add a ``## Docs`` rule to ``AGENTS.md`` to
codify it.

Co-authored-by: Cursor <cursoragent@cursor.com>
mikasenghaas and others added 6 commits May 29, 2026 11:00
num_infer_nodes 1->2 and inference dp 8->16 (8/node x 2 nodes, tp=1).
data_parallel_size_local auto-derives to 8. Total alloc now 3 nodes
(1 train + 2 infer). Relieves the rollout-throughput bottleneck that
makes SWE batch fill (~36min/step) slow.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
dp in multi-node inference is per-node (data_parallel_size_local),
multiplied by num_infer_nodes internally. dp=16 double-counted →
inference_world_size=32. dp=8 × 2 infer nodes = 16 global ranks.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
num_infer_nodes=2 builds ONE inference instance spanning 2 nodes
(cross-node DP coordinator → slow/fragile rendezvous). For a 4B model
on 1 GPU we want 2 independent single-node replicas behind the router:
num_infer_nodes=1 + num_infer_replicas=2 (dp=8/node → 16 infer GPUs).
Same 2x throughput, no cross-node coordination.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
start()'s teardown now logs a forced-cleanup warning when main_loop is
interrupted (signal CancelledError / KeyboardInterrupt / error) instead
of the clean-exit success line, so an interrupted run is distinguishable
from a true completion.
print_benchmark was unused in the orchestrator package (the trainers use
the copy in trainer/utils.py); drop it and its now-orphaned pandas / rich
/ format_time / Any imports.
@mikasenghaas mikasenghaas marked this pull request as ready for review May 29, 2026 19:47
@mikasenghaas mikasenghaas requested a review from samsja May 29, 2026 19:47
Comment thread packages/prime-rl-configs/src/prime_rl/configs/orchestrator.py
Comment thread src/prime_rl/orchestrator/orchestrator.py
Comment thread src/prime_rl/orchestrator/train_source.py
@mikasenghaas mikasenghaas changed the title feat(orchestrator): overlap train/eval feat(orchestrator): refactor to overlap train/eval May 29, 2026
…off-by-one

- Eval rollouts now always go to the student inference pool + student
  policy name. Previously the dispatcher used the single rollout pool for
  both train and eval, so in SFT mode (rollout pool = teacher) eval
  evaluated the *teacher* (fixed ~0.8), masking student progress. Add a
  dedicated eval_inference pool to the dispatcher; train still uses the
  rollout pool (teacher in SFT).
- maybe_save_ckpt: skip step >= max_steps - 1 (was == max_steps - 1) so
  the drain-entry step (== max_steps, never shipped) doesn't double-save
  a checkpoint at an interval boundary.
- CHANGELOG: document the orchestrator breaking config changes.
Comment thread src/prime_rl/orchestrator/metrics.py
- MetricsBuilder: solve_all now compares each problem's summed reward to
  its own env's group_size (was the top-level group_size), fixing
  solve_all/effective_batch_size for per-env group_size overrides.
- Group all per-problem metrics by the rollout's group_id (UUID) instead
  of (env_name, example_id), which collides across envs / oversampling.
- alphabet_sort integration test: run 10 steps (was 5).
Finish moving the color-codeword multimodal configs out of the
configs/multimodal/ dir: drop the unused rl_color_codeword.toml /
rl_color_codeword_test.toml and trim the renamed
configs/debug/multimodal.toml header. The dir is now empty.
Eval rollouts were generated via select_train_client (the renderer/token
train client) instead of get_eval_client (chat-completions). That changed
the eval generation path vs the legacy orchestrator (which used the chat
eval client), producing a consistent eval-score offset. Route eval groups
through pool.get_eval_client() so eval matches the legacy path; train
still uses the renderer/token train client.
Comment thread src/prime_rl/orchestrator/ckpt.py Outdated
Comment thread configs/wiki_search/rl.toml
…runs

- Drop Progress.last_eval_step_by_env (and its EvalSource plumbing). It
  only suppressed a redundant eval epoch when resuming exactly at an
  eval-interval step; the first_trigger/is_resumed guard already prevents
  re-running the startup eval on resume. Progress is now back to main's
  shape (step + totals).
- Rename the orchestrator checkpoint file state.pt -> progress.pt to match
  the previous orchestrator. With the Progress shape restored, resuming via
  ckpt.resume_step from an old checkpoint now works (no FileNotFoundError,
  no migration needed).
Comment thread src/prime_rl/orchestrator/eval_source.py
Replaces the ship-time async barrier with a dispatcher-side gate owned by
the orchestrator. The old wait_barrier blocked main_loop, which is also the
only consumer of dispatcher.out_q — when on_new_version (called by the
watcher under update_lock) emitted enough Cancelled markers to fill the
queue, its out_q.put blocked, the watcher never advanced policy.version,
and the barrier never released. With max_off_policy_steps=0 every version
bump cancels every inflight rollout, so the queue routinely overflowed.

The dispatcher now exposes a dispatch_allowed asyncio.Event; fill_inflight
returns early when clear. The orchestrator toggles it via the public
update_dispatch_gate based on (progress.step + 1) - policy.version vs
TARGET_LAG. It runs after sender.send (step advances) and from a new
on_new_version observer hook (policy advances). main_loop never blocks, so
on_new_version's puts always have a consumer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread src/prime_rl/orchestrator/dispatcher.py
drop_group used to interleave dict mutations with awaits on emit_rollout
(which yields when out_q is full). During a yield, handle_completed_rollout
could pop a task drop_group hadn't reached yet, leaving the snapshot stale.
drop_group then re-popped (returning None — discarded), double-released the
permits, and emitted a second set of Cancelled markers for that rollout.

Split into a sync claim sweep (pop + release, no awaits) followed by the
async emit loop. By the time we start awaiting, every dropped task is
already out of self.inflight, so handle_completed_rollout's existing
None-guard bails out cleanly. No defensive None-checks in drop_group itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mikasenghaas mikasenghaas requested a review from hallerite May 30, 2026 01:38
mikasenghaas and others added 3 commits May 30, 2026 12:48
- enable_prefix_caching=true ([inference] field): big win for SWE's long
  shared multi-turn prefixes.
- language_model_only=true (via [inference.vllm_extra]; it's a vLLM
  MultiModalConfig arg, no prime-rl field): Qwen3.5-4B is a VL model, so
  skip the vision tower for text-only SWE.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Set [orchestrator.renderer] name="qwen3.5", enable_thinking=false
(Qwen35RendererConfig field) — text-only SWE, no <think> traces. Was
relying on the auto renderer (default thinking on for 4B).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The on-main config is the non-thinking version (enable_thinking=false)
+ prefix caching + language_model_only. Run length 100 -> 400.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment thread src/prime_rl/orchestrator/dispatcher.py
Comment thread src/prime_rl/orchestrator/dispatcher.py
Flip enable_thinking back to true on the on-main qwen3.5 config (keeps
prefix caching + language_model_only + 400 steps).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 234adc2. Configure here.

# ``start()``, so an interval boundary at max_steps doesn't double-save.
near_end = self.config.max_steps is not None and step >= self.config.max_steps - 1
if near_end:
return 0.0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoint skipped one step too early at max_steps

Medium Severity

The near_end guard in maybe_save_ckpt uses step >= self.config.max_steps - 1, which skips checkpointing at the second-to-last step (max_steps - 1). The comment says both steps "defer to the final save in start()", but the final save writes at progress.step = max_steps (after the increment), not at max_steps - 1. These are different checkpoint steps with different content, so there is no double-save risk at max_steps - 1. The condition loses one intermediate checkpoint that the user's ckpt.interval / keep_interval would otherwise preserve.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 234adc2. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant